package com.open.source.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

/**
 * 手动提交,提交成功务必别忘记调用 acknowledgment.acknowledge();
 *
 * @author ZonLen since on 2022/7/29 下午12:22
 */

@Slf4j
public abstract class AbstractKafkaManualCommitConsumerListener extends
    AbstractKafkaConsumerListener implements
    AcknowledgingMessageListener<String, String> {


  @Override
  protected void configureConsumer(ContainerProperties containerProperties) {
    containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
    containerProperties.getKafkaConsumerProperties()
        .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  }

}
